/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.dq.data.service.impl.function.data;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.Range;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.model.EntityElement;
import org.unidata.mdm.core.type.model.LookupElement;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.exception.CleanseFunctionExecutionException;
import org.unidata.mdm.dq.core.service.impl.function.system.AbstractSystemCleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.dq.core.type.io.DataQualitySpot;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.RecordHeaderField;
import org.unidata.mdm.meta.util.ValidityPeriodUtils;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.dto.SearchResultDTO;
import org.unidata.mdm.search.dto.SearchResultHitDTO;
import org.unidata.mdm.search.dto.SearchResultHitFieldDTO;
import org.unidata.mdm.search.service.SearchService;
import org.unidata.mdm.search.type.form.FieldsGroup;
import org.unidata.mdm.search.type.form.FormField;
import org.unidata.mdm.search.type.query.SearchQuery;
import org.unidata.mdm.search.type.sort.SortField;
import org.unidata.mdm.search.type.sort.SortField.SortOrder;
import org.unidata.mdm.search.util.SearchUtils;
import org.unidata.mdm.system.util.TextUtils;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;

/**
 * The Class CFDataConsistency.
 */
public class CheckRecordConsistency extends AbstractSystemCleanseFunction {
    /**
     * Display name code.
     */
    private static final String FUNCTION_DISPLAY_NAME = "app.dq.functions.data.check.consistency.display.name";
    /**
     * Description code.
     */
    private static final String FUNCTION_DESCRIPTION = "app.dq.functions.data.check.consistency.decsription";
    /**
     * IP1 name code.
     */
    private static final String INPUT_PORT_1_NAME = "app.dq.functions.data.check.consistency.input.port1.name";
    /**
     * IP1 description code.
     */
    private static final String INPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.check.consistency.input.port1.decsription";
    /**
     * IP2 name code.
     */
    private static final String INPUT_PORT_2_NAME = "app.dq.functions.data.check.consistency.input.port2.name";
    /**
     * IP2 description code.
     */
    private static final String INPUT_PORT_2_DESCRIPTION = "app.dq.functions.data.check.consistency.input.port2.decsription";
    /**
     * OP1 name code.
     */
    private static final String OUTPUT_PORT_1_NAME = "app.dq.functions.data.check.consistency.output.port1.name";
    /**
     * OP1 description code.
     */
    private static final String OUTPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.check.consistency.output.port1.decsription";
    /**
     * OP2 name code.
     */
    private static final String OUTPUT_PORT_2_NAME = "app.dq.functions.data.check.consistency.output.port2.name";
    /**
     * OP2 description code.
     */
    private static final String OUTPUT_PORT_2_DESCRIPTION = "app.dq.functions.data.check.consistency.output.port2.decsription";
    /**
     * This function configuration.
     */
    private static final CleanseFunctionConfiguration CONFIGURATION
        = CleanseFunctionConfiguration.configuration()
            .supports(CleanseFunctionExecutionScope.GLOBAL, CleanseFunctionExecutionScope.LOCAL)
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE, CleanseFunctionPortInputType.ARRAY, CleanseFunctionPortInputType.CODE)
                    .valueTypes(CleanseFunctionPortValueType.STRING, CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_2)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_2_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_2_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(true)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_2)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_2_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_2_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .build();
    /**
     * Instantiates a new cleanse function abstract.
     */
    public CheckRecordConsistency() {
        super("CheckRecordConsistency", () -> TextUtils.getText(FUNCTION_DISPLAY_NAME), () -> TextUtils.getText(FUNCTION_DESCRIPTION));
    }
    /**
     * Search service.
     */
    @Autowired
    private SearchService searchService;
    /**
     * Model Service.
     */
    @Autowired
    private MetaModelService modelService;
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionConfiguration configure() {
        return CONFIGURATION;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        CleanseFunctionResult result = new CleanseFunctionResult();

        CleanseFunctionInputParam param1 = ctx.getInputParam(CleanseConstants.INPUT_PORT_1);
        CleanseFunctionInputParam param2 = ctx.getInputParam(CleanseConstants.INPUT_PORT_2);

        // 1. Invalid input - lookup entity name is null
        // 2. Fail on wrong configuration. Lookup entity not found by name.
        LookupElement lookupElement = null;
        if (Objects.isNull(param2) || param2.isEmpty() || StringUtils.isBlank(param2.toSingletonValue())) {
            throw new CleanseFunctionExecutionException(ctx.getFunctionName(),
                    TextUtils.getText("app.cleanse.validation.invalid.parameters"));
        } else {

            lookupElement = modelService.instance(Descriptors.DATA).getLookup(param2.toSingletonValue());
            if (lookupElement == null) {
                throw new CleanseFunctionExecutionException(ctx.getFunctionName(),
                        TextUtils.getText("app.cleanse.validation.lookupEntity.notExist", param2.toSingletonValue()));
            }
        }

        // 3. Extract values, exit on empty
        Map<Object, List<Attribute>> check = extractAndMapAttributes(param1);
        if (check.isEmpty()) {
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.TRUE));
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_2, StringUtils.EMPTY));
            return result;
        }

        // 4. Check payload, which must be an instance of UpsertRequestContext
        if (Objects.isNull(ctx.getPayload()) || !(ctx.getPayload() instanceof UpsertRequestContext)) {
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.FALSE));
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_2, createMessageString("app.cleanse.validation.payload.missing")));
            return result;
        }

        UpsertRequestContext uCtx = (UpsertRequestContext) ctx.getPayload();
        RecordKeys keys = uCtx.keys();
        Date factoryFrom = uCtx.getValidFrom();
        Date factoryTo = uCtx.getValidTo();

        // 5. Collect current values and compare them with input. Report missing.
        Map<Object, List<Range<Date>>> current = collectCurrentState(lookupElement, check);
        Pair<Set<Object>, Set<Attribute>> failed = collectMissingValues(check, current);

        // 6. Stop on missing values in links
        if (CollectionUtils.isNotEmpty(failed.getKey())) {
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, Boolean.FALSE));
            result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_2, createMessageString("app.cleanse.validation.lookupEntityRecord.notExist", failed.getKey().toString())));
            result.addSpots(createPathCollection(failed.getValue()));
            return result;
        }

        // 7. Check ranges of referenced values
        String entityName = keys.getEntityName();
        if(StringUtils.isBlank(entityName)){
            entityName = lookupElement.getName();
        }

        Date from = factoryFrom != null ? factoryFrom : defaultStart(entityName);
        Date to = factoryTo == null ? factoryTo : defaultEnd(entityName);

        Range<Date> range = Range.between(from, to);

        failed = collectFailedRanges(check, current, range);

        boolean isOk = failed.getKey().isEmpty();
        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, isOk));
        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_2, isOk
                ? StringUtils.EMPTY
                : createMessageString("app.cleanse.validation.lookupEntityRecord.notOverlapped", failed.getKey().toString())));

        if (!isOk) {
            result.addSpots(createPathCollection(failed.getValue()));
        }

        return result;
    }
    /**
     * Creates search request and executes it, returning preprocessed response.
     * @param lookupEntityDef lookup entity definition
     * @param values the values map
     * @return search result as multimap
     */
    private Map<Object, List<Range<Date>>> collectCurrentState(LookupElement lookupElement, Map<Object, List<Attribute>> values) {

        // 1. Request
        SearchRequestContext ctx = SearchRequestContext.builder(EntityIndexType.RECORD, lookupElement.getName())
                .query(SearchQuery.formQuery(
                    FieldsGroup.and(
                        FormField.exact(RecordHeaderField.FIELD_DELETED, Boolean.FALSE),
                        FormField.exact(RecordHeaderField.FIELD_INACTIVE, Boolean.FALSE),
                        FormField.exact(lookupElement.getCodeAttribute().getIndexed(), values.keySet()))))
                .returnFields(Arrays.asList(
                        RecordHeaderField.FIELD_TO.getName(),
                        RecordHeaderField.FIELD_FROM.getName(),
                        lookupElement.getCodeAttribute().getName()))
                .sorting(Collections.singleton(SortField.of(RecordHeaderField.FIELD_FROM, SortOrder.ASC)))
                .page(0)
                .count(Integer.MAX_VALUE)
                .totalCount(true)
                .build();

        // 2. Execute
        SearchResultDTO searchResult = searchService.search(ctx);

        // 3. Post-process
        if (CollectionUtils.isEmpty(searchResult.getHits())) {
            return Collections.emptyMap();
        }

        Map<Object, List<Range<Date>>> ranges = new HashMap<>();
        for (SearchResultHitDTO hit : searchResult.getHits()) {

            Range<Date> range = getDateRange(hit);
            if (Objects.isNull(range)) {
                continue;
            }

            List<Object> codeValues = hit.getFieldValues(lookupElement.getCodeAttribute().getName());
            if (CollectionUtils.isNotEmpty(codeValues)) {
                codeValues.forEach(codeValue -> ranges.computeIfAbsent(codeValue instanceof Number
                            ? ((Number) codeValue).longValue()
                            : codeValue.toString(), key -> new ArrayList<>())
                        .add(range));
            }
        }

        for (Iterator<Entry<Object, List<Range<Date>>>> vi = ranges.entrySet().iterator(); vi.hasNext(); ) {
            Entry<Object, List<Range<Date>>> val = vi.next();
            List<Range<Date>> merged = mergeRanges(val.getValue());
            val.getValue().clear();
            val.getValue().addAll(merged);
        }

        return ranges;
    }
    /**
     * Gets the not presented values.
     *
     * @param check the values to check
     * @param current the current state
     * @return the not present values
     */
    private Pair<Set<Object>, Set<Attribute>> collectMissingValues(
            Map<Object, List<Attribute>> check, Map<Object, List<Range<Date>>> current) {

        Set<Object> missingValues = new HashSet<>();
        Set<Attribute> failedAttributes = new HashSet<>();
        for (Entry<Object, List<Attribute>> checkEntry : check.entrySet()) {
            if (!current.containsKey(checkEntry.getKey())) {
                missingValues.add(checkEntry.getKey());
                failedAttributes.addAll(checkEntry.getValue());
            }
        }

        return new ImmutablePair<>(missingValues, failedAttributes);
    }
    /**
     * Collects failed ranges.
     * @param check the values to check
     * @param current current values
     * @param range the record range
     * @return failed
     */
    @Nonnull
    private Pair<Set<Object>, Set<Attribute>> collectFailedRanges(
            Map<Object, List<Attribute>> check, Map<Object, List<Range<Date>>> current, Range<Date> range) {

        Set<Object> missingValues = new HashSet<>();
        Set<Attribute> failedAttributes = new HashSet<>();
        for (Entry<Object, List<Attribute>> checkEntry : check.entrySet()) {

            List<Range<Date>> ranges = current.get(checkEntry.getKey());
            boolean isCovered = false;
            for (Range<Date> test : ranges) {
                isCovered = test.containsRange(range);
                if (isCovered) {
                    break;
                }
            }

            if (!isCovered) {
                missingValues.add(checkEntry.getKey());
                failedAttributes.addAll(checkEntry.getValue());
            }
        }

        return new ImmutablePair<>(missingValues, failedAttributes);
    }
    /**
     * Collects paths of attributes.
     * @param attributes the attributes collection
     * @return paths collection
     */
    private Collection<DataQualitySpot> createPathCollection(Collection<Attribute> attributes) {
        return CollectionUtils.isEmpty(attributes)
                ? Collections.emptyList()
                : attributes.stream()
                    .map(attr -> new DataQualitySpot(attr.toLocalPath(), attr))
                    .collect(Collectors.toList());
    }
    /**
     * Creates output message string.
     * @param message the message code
     * @param args arguments
     * @return message
     */
    private String createMessageString(String message, Object... args) {
        return TextUtils.getText(message, args);
    }
    /**
     * Collect ranges.
     *
     * @param searchResult - search result
     * @return multi map, where key it is a lookup record etalon key and value is collection of record ranges
     */
    @Nonnull
    private Multimap<String, Range<Date>> collectRanges(SearchResultDTO searchResult) {
        int approximateRangesSize = 3;
        int hits = searchResult.getHits().size();
        Multimap<String, Range<Date>> ranges = HashMultimap.create(hits, approximateRangesSize);
        for (SearchResultHitDTO hit : searchResult.getHits()) {
            Range<Date> range = getDateRange(hit);
            if (Objects.isNull(range)) {
                continue;
            }
            ranges.put(hit.getId(), range);
        }

        for (String etalonId : new ArrayList<>(ranges.keySet())) {
            List<Range<Date>> recordRanges = new ArrayList<>(ranges.removeAll(etalonId));
            ranges.putAll(etalonId, mergeRanges(recordRanges));
        }

        return ranges;
    }

    /**
     * Merge ranges.
     *
     * @param lookupRanges - unmerged ranges
     * @return collection of merged ranges
     */
     private List<Range<Date>> mergeRanges(List<Range<Date>> lookupRanges){
         // Guard against clear -> addAll
         if (lookupRanges.size() < 2) {
            return lookupRanges.isEmpty()
                    ? Collections.emptyList()
                    : Collections.singletonList(lookupRanges.get(0));
         }

         //merge ranges
         List<Range<Date>> mergedRanges = new ArrayList<>(lookupRanges.size());
         lookupRanges.sort((o1, o2) -> {

             if (o1.getComparator() == null && o2.getMinimum() == null) {
                 return 0;
             } else if (o1.getMinimum() == null) {
                 return -1;
             } else if (o2.getMinimum() == null) {
                 return +1;
             }

             return o1.getMinimum().compareTo(o2.getMinimum());
        });

        Iterator<Range<Date>> iterator = lookupRanges.iterator();
        Range<Date> initialRange = iterator.next();
        while (iterator.hasNext()) {
            Range<Date> nextRange = iterator.next();
            Range<Date> mergedRange = getMergedRange(initialRange, nextRange);
            if (mergedRange == null) {
                mergedRanges.add(initialRange);
                initialRange = nextRange;
            } else {
                initialRange = mergedRange;
            }
        }

        mergedRanges.add(initialRange);
        return mergedRanges;
    }

    /**
     * Gets the merged range.
     *
     * @param first the first
     * @param second the second
     * @return the merged range
     */
    @Nullable
    private Range<Date> getMergedRange(@Nonnull Range<Date> first, @Nonnull Range<Date> second) {
        long to = first.getMaximum().getTime();
        long from = second.getMinimum().getTime();
        boolean isRangeOverlapped = from - to <= TimeUnit.MILLISECONDS.toMillis(1);
        if (isRangeOverlapped) {
            return Range.between(first.getMinimum(), second.getMaximum());
        } else {
            return null;
        }
    }

    /**
     * Gets the date range.
     *
     * @param hit the hit
     * @return the date range
     */
    @Nullable
    private Range<Date> getDateRange(SearchResultHitDTO hit) {

        SearchResultHitFieldDTO to = hit.getFieldValue(RecordHeaderField.FIELD_TO.getName());
        SearchResultHitFieldDTO from = hit.getFieldValue(RecordHeaderField.FIELD_FROM.getName());

        try {

            Date validTo = to == null || to.isNullField()
                    ? SearchUtils.ES_MAX_DATE
                    : Date.from(Instant.parse(to.getFirstValue().toString()));

            Date validFrom = from == null || from.isNullField()
                    ? SearchUtils.ES_MIN_DATE
                    : Date.from(Instant.parse(from.getFirstValue().toString()));

            return Range.between(validFrom, validTo);
        } catch (Exception e) {
            return null;
        }
    }
    /**
     * Default start.
     *
     * @param entityName the entity name
     * @return the date
     */
    private Date defaultStart(String entityName) {

        if (StringUtils.isBlank(entityName)) {
            return null;
        }

        EntityElement eel = modelService.instance(Descriptors.DATA)
            .getElement(entityName);

        if (eel.isValidityPeriod()
         && eel.getValidityPeriod().isDefined()
         && eel.getValidityPeriod().getFrom() != null) {

            return eel.getValidityPeriod().getFrom();
        }

        Date retval = ValidityPeriodUtils.getGlobalValidityPeriodStart();
        return retval == null ? SearchUtils.ES_MIN_DATE : retval;
    }

    /**
     * Default end.
     *
     * @param entityName the entity name
     * @return the date
     */
    private Date defaultEnd(String entityName) {

        if (StringUtils.isBlank(entityName)) {
            return null;
        }

        EntityElement eel = modelService.instance(Descriptors.DATA)
            .getElement(entityName);

        if (eel.isValidityPeriod()
         && eel.getValidityPeriod().isDefined()
         && eel.getValidityPeriod().getTo() != null) {

            return eel.getValidityPeriod().getTo();
        }

        Date retval = ValidityPeriodUtils.getGlobalValidityPeriodEnd();
        return retval == null ? SearchUtils.ES_MAX_DATE : retval;
    }
}
